// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#include "ccf/base_endpoint_registry.h"
#include "ccf/service/tables/nodes.h"
#include "snapshots/filenames.h"

namespace ccf::node
{
  // Helper function to lookup redirect address based on the interface on this
  // node which received the request. Will either return an address, or
  // populate an appropriate error on the response context.
  // Takes both CommandEndpointContext and ReadOnlyTx, so that it can be
  // called be either read-only or read-write endpoints
  static std::optional<std::string> get_redirect_address_for_node(
    const ccf::endpoints::CommandEndpointContext& ctx,
    ccf::kv::ReadOnlyTx& ro_tx,
    const ccf::NodeId& target_node)
  {
    auto nodes = ro_tx.ro<ccf::Nodes>(ccf::Tables::NODES);

    auto node_info = nodes->get(target_node);
    if (!node_info.has_value())
    {
      LOG_FAIL_FMT("Node redirection error: Unknown node {}", target_node);
      ctx.rpc_ctx->set_error(
        HTTP_STATUS_INTERNAL_SERVER_ERROR,
        ccf::errors::InternalError,
        fmt::format(
          "Cannot find node info to produce redirect response for node {}",
          target_node));
      return std::nullopt;
    }

    const auto interface_id = ctx.rpc_ctx->get_session_context()->interface_id;
    if (!interface_id.has_value())
    {
      LOG_FAIL_FMT("Node redirection error: Non-RPC request");
      ctx.rpc_ctx->set_error(
        HTTP_STATUS_INTERNAL_SERVER_ERROR,
        ccf::errors::InternalError,
        "Cannot redirect non-RPC request");
      return std::nullopt;
    }

    const auto& interfaces = node_info->rpc_interfaces;
    const auto interface_it = interfaces.find(interface_id.value());
    if (interface_it == interfaces.end())
    {
      LOG_FAIL_FMT(
        "Node redirection error: Target missing interface {}",
        interface_id.value());
      ctx.rpc_ctx->set_error(
        HTTP_STATUS_INTERNAL_SERVER_ERROR,
        ccf::errors::InternalError,
        fmt::format(
          "Cannot redirect request. Received on RPC interface {}, which is "
          "not present on target node {}",
          interface_id.value(),
          target_node));
      return std::nullopt;
    }

    const auto& interface = interface_it->second;
    return interface.published_address;
  }

  static void init_file_serving_handlers(
    ccf::BaseEndpointRegistry& registry, ccf::AbstractNodeContext& node_context)
  {
    // Redirect to endpoint for a single specific snapshot
    static constexpr auto snapshot_since_param_key = "since";

    auto find_snapshot = [&](ccf::endpoints::ReadOnlyEndpointContext& ctx) {
      size_t latest_idx = 0;
      {
        // Get latest_idx from query param, if present
        const auto parsed_query =
          http::parse_query(ctx.rpc_ctx->get_request_query());

        std::string error_reason;
        auto snapshot_since = http::get_query_value_opt<ccf::SeqNo>(
          parsed_query, snapshot_since_param_key, error_reason);

        if (snapshot_since.has_value())
        {
          if (error_reason != "")
          {
            ctx.rpc_ctx->set_error(
              HTTP_STATUS_BAD_REQUEST,
              ccf::errors::InvalidQueryParameterValue,
              std::move(error_reason));
            return;
          }
          latest_idx = snapshot_since.value();
        }
      }

      auto node_operation = node_context.get_subsystem<AbstractNodeOperation>();
      if (node_operation == nullptr)
      {
        ctx.rpc_ctx->set_error(
          HTTP_STATUS_INTERNAL_SERVER_ERROR,
          ccf::errors::InternalError,
          "Unable to access NodeOperation subsystem");
        return;
      }

      if (!node_operation->can_replicate())
      {
        // Try to redirect to primary for preferable snapshot, expected to
        // match later /join request
        auto primary_id = node_operation->get_primary();
        if (primary_id.has_value())
        {
          const auto address =
            get_redirect_address_for_node(ctx, ctx.tx, *primary_id);
          if (!address.has_value())
          {
            return;
          }

          auto location =
            fmt::format("https://{}/node/snapshot", address.value());
          if (latest_idx != 0)
          {
            location +=
              fmt::format("?{}={}", snapshot_since_param_key, latest_idx);
          }

          ctx.rpc_ctx->set_response_header(http::headers::LOCATION, location);
          ctx.rpc_ctx->set_error(
            HTTP_STATUS_PERMANENT_REDIRECT,
            ccf::errors::NodeCannotHandleRequest,
            "Node is not primary; redirecting for preferable snapshot");
          return;
        }

        // If there is no current primary, fall-back to returning this
        // node's best snapshot rather than terminating the fetch with an
        // error
      }

      auto node_configuration_subsystem =
        node_context.get_subsystem<NodeConfigurationSubsystem>();
      if (node_configuration_subsystem == nullptr)
      {
        ctx.rpc_ctx->set_error(
          HTTP_STATUS_INTERNAL_SERVER_ERROR,
          ccf::errors::InternalError,
          "NodeConfigurationSubsystem is not available");
        return;
      }

      const auto& snapshots_config =
        node_configuration_subsystem->get().node_config.snapshots;

      const auto orig_latest = latest_idx;
      auto latest_committed_snapshot =
        snapshots::find_latest_committed_snapshot_in_directory(
          snapshots_config.directory, latest_idx);

      if (!latest_committed_snapshot.has_value())
      {
        ctx.rpc_ctx->set_error(
          HTTP_STATUS_NOT_FOUND,
          ccf::errors::ResourceNotFound,
          fmt::format(
            "This node has no committed snapshots since {}", orig_latest));
        return;
      }

      const auto& snapshot_path = latest_committed_snapshot.value();

      const auto address =
        get_redirect_address_for_node(ctx, ctx.tx, node_context.get_node_id());
      if (!address.has_value())
      {
        return;
      }

      auto redirect_url = fmt::format(
        "https://{}/node/snapshot/{}", address.value(), snapshot_path);
      LOG_DEBUG_FMT("Redirecting to snapshot: {}", redirect_url);
      ctx.rpc_ctx->set_response_header(
        ccf::http::headers::LOCATION, redirect_url);
      ctx.rpc_ctx->set_response_status(HTTP_STATUS_PERMANENT_REDIRECT);
    };
    registry
      .make_read_only_endpoint(
        "/snapshot", HTTP_HEAD, find_snapshot, no_auth_required)
      .set_forwarding_required(endpoints::ForwardingRequired::Never)
      .add_query_parameter<ccf::SeqNo>(
        snapshot_since_param_key, ccf::endpoints::OptionalParameter)
      .set_openapi_hidden(true)
      .require_operator_feature(endpoints::OperatorFeature::SnapshotRead)
      .install();
    registry
      .make_read_only_endpoint(
        "/snapshot", HTTP_GET, find_snapshot, no_auth_required)
      .set_forwarding_required(endpoints::ForwardingRequired::Never)
      .add_query_parameter<ccf::SeqNo>(
        snapshot_since_param_key, ccf::endpoints::OptionalParameter)
      .set_openapi_hidden(true)
      .require_operator_feature(endpoints::OperatorFeature::SnapshotRead)
      .install();

    auto get_snapshot = [&](ccf::endpoints::CommandEndpointContext& ctx) {
      auto node_configuration_subsystem =
        node_context.get_subsystem<NodeConfigurationSubsystem>();
      if (node_configuration_subsystem == nullptr)
      {
        ctx.rpc_ctx->set_error(
          HTTP_STATUS_INTERNAL_SERVER_ERROR,
          ccf::errors::InternalError,
          "NodeConfigurationSubsystem is not available");
        return;
      }

      const auto& snapshots_config =
        node_configuration_subsystem->get().node_config.snapshots;

      std::string snapshot_name;
      std::string error;
      if (!ccf::endpoints::get_path_param(
            ctx.rpc_ctx->get_request_path_params(),
            "snapshot_name",
            snapshot_name,
            error))
      {
        ctx.rpc_ctx->set_error(
          HTTP_STATUS_BAD_REQUEST,
          ccf::errors::InvalidResourceName,
          std::move(error));
        return;
      }

      files::fs::path snapshot_path =
        files::fs::path(snapshots_config.directory) / snapshot_name;

      std::ifstream f(snapshot_path, std::ios::binary);
      if (!f.good())
      {
        ctx.rpc_ctx->set_error(
          HTTP_STATUS_NOT_FOUND,
          ccf::errors::ResourceNotFound,
          fmt::format(
            "This node does not have a snapshot named {}", snapshot_name));
        return;
      }

      LOG_DEBUG_FMT("Found snapshot: {}", snapshot_path.string());

      f.seekg(0, f.end);
      const auto total_size = (size_t)f.tellg();

      ctx.rpc_ctx->set_response_header("accept-ranges", "bytes");

      ctx.rpc_ctx->set_response_header(
        ccf::http::headers::CCF_SNAPSHOT_NAME, snapshot_name);

      if (ctx.rpc_ctx->get_request_verb() == HTTP_HEAD)
      {
        ctx.rpc_ctx->set_response_status(HTTP_STATUS_OK);
        ctx.rpc_ctx->set_response_header(
          ccf::http::headers::CONTENT_LENGTH, total_size);
        return;
      }

      size_t range_start = 0;
      size_t range_end = total_size;
      {
        const auto range_header = ctx.rpc_ctx->get_request_header("range");
        if (range_header.has_value())
        {
          LOG_TRACE_FMT("Parsing range header {}", range_header.value());

          auto [unit, ranges] = ccf::nonstd::split_1(range_header.value(), "=");
          if (unit != "bytes")
          {
            ctx.rpc_ctx->set_error(
              HTTP_STATUS_BAD_REQUEST,
              ccf::errors::InvalidHeaderValue,
              "Only 'bytes' is supported as a Range header unit");
            return;
          }

          if (ranges.find(",") != std::string::npos)
          {
            ctx.rpc_ctx->set_error(
              HTTP_STATUS_BAD_REQUEST,
              ccf::errors::InvalidHeaderValue,
              "Multiple ranges are not supported");
            return;
          }

          const auto segments = ccf::nonstd::split(ranges, "-");
          if (segments.size() != 2)
          {
            ctx.rpc_ctx->set_error(
              HTTP_STATUS_BAD_REQUEST,
              ccf::errors::InvalidHeaderValue,
              fmt::format(
                "Invalid format, cannot parse range in {}",
                range_header.value()));
            return;
          }

          const auto s_range_start = segments[0];
          const auto s_range_end = segments[1];

          if (!s_range_start.empty())
          {
            {
              const auto [p, ec] = std::from_chars(
                s_range_start.begin(), s_range_start.end(), range_start);
              if (ec != std::errc())
              {
                ctx.rpc_ctx->set_error(
                  HTTP_STATUS_BAD_REQUEST,
                  ccf::errors::InvalidHeaderValue,
                  fmt::format(
                    "Unable to parse start of range value {} in {}",
                    s_range_start,
                    range_header.value()));
                return;
              }
            }

            if (range_start > total_size)
            {
              ctx.rpc_ctx->set_error(
                HTTP_STATUS_BAD_REQUEST,
                ccf::errors::InvalidHeaderValue,
                fmt::format(
                  "Start of range {} is larger than total file size {}",
                  range_start,
                  total_size));
              return;
            }

            if (!s_range_end.empty())
            {
              // Fully-specified range, like "X-Y"
              {
                const auto [p, ec] = std::from_chars(
                  s_range_end.begin(), s_range_end.end(), range_end);
                if (ec != std::errc())
                {
                  ctx.rpc_ctx->set_error(
                    HTTP_STATUS_BAD_REQUEST,
                    ccf::errors::InvalidHeaderValue,
                    fmt::format(
                      "Unable to parse end of range value {} in {}",
                      s_range_end,
                      range_header.value()));
                  return;
                }
              }

              if (range_end > total_size)
              {
                LOG_DEBUG_FMT(
                  "Requested snapshot range ending at {}, but file size is "
                  "only {} - shrinking range end",
                  range_end,
                  total_size);
                range_end = total_size;
              }

              if (range_end < range_start)
              {
                ctx.rpc_ctx->set_error(
                  HTTP_STATUS_BAD_REQUEST,
                  ccf::errors::InvalidHeaderValue,
                  fmt::format(
                    "Invalid range: Start ({}) and end ({}) out of order",
                    range_start,
                    range_end));
                return;
              }
            }
            else
            {
              // Else this is an open-ended range like "X-"
              range_end = total_size;
            }
          }
          else
          {
            if (!s_range_end.empty())
            {
              // Negative range, like "-Y"
              size_t offset;
              const auto [p, ec] =
                std::from_chars(s_range_end.begin(), s_range_end.end(), offset);
              if (ec != std::errc())
              {
                ctx.rpc_ctx->set_error(
                  HTTP_STATUS_BAD_REQUEST,
                  ccf::errors::InvalidHeaderValue,
                  fmt::format(
                    "Unable to parse end of range offset value {} in {}",
                    s_range_end,
                    range_header.value()));
                return;
              }

              range_end = total_size;
              range_start = range_end - offset;
            }
            else
            {
              ctx.rpc_ctx->set_error(
                HTTP_STATUS_BAD_REQUEST,
                ccf::errors::InvalidHeaderValue,
                "Invalid range: Must contain range-start or range-end");
              return;
            }
          }
        }
      }

      const auto range_size = range_end - range_start;

      LOG_TRACE_FMT(
        "Reading {}-byte range from {} to {}",
        range_size,
        range_start,
        range_end);

      // Read requested range into buffer
      std::vector<uint8_t> contents(range_size);
      f.seekg(range_start);
      f.read((char*)contents.data(), contents.size());
      f.close();

      // Build successful response
      ctx.rpc_ctx->set_response_status(HTTP_STATUS_PARTIAL_CONTENT);
      ctx.rpc_ctx->set_response_header(
        ccf::http::headers::CONTENT_TYPE,
        ccf::http::headervalues::contenttype::OCTET_STREAM);
      ctx.rpc_ctx->set_response_body(std::move(contents));

      // Partial Content responses describe the current response in
      // Content-Range
      ctx.rpc_ctx->set_response_header(
        ccf::http::headers::CONTENT_RANGE,
        fmt::format("bytes {}-{}/{}", range_start, range_end, total_size));
    };
    registry
      .make_command_endpoint(
        "/snapshot/{snapshot_name}", HTTP_HEAD, get_snapshot, no_auth_required)
      .set_forwarding_required(endpoints::ForwardingRequired::Never)
      .set_openapi_hidden(true)
      .require_operator_feature(endpoints::OperatorFeature::SnapshotRead)
      .install();
    registry
      .make_command_endpoint(
        "/snapshot/{snapshot_name}", HTTP_GET, get_snapshot, no_auth_required)
      .set_forwarding_required(endpoints::ForwardingRequired::Never)
      .set_openapi_hidden(true)
      .require_operator_feature(endpoints::OperatorFeature::SnapshotRead)
      .install();
  }
}